{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start the beam-flink job server"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from hops import beam as hops_beam\n",
    "# Start Beam jobservice\n",
    "hops_beam.start(taskmanager_heap_size=8192)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "MV-CWK2kI-iH"
   },
   "source": [
    "## TFMA Notebook example\n",
    "\n",
    "This notebook describes how to export your model for TFMA and demonstrates the analysis tooling it offers.\n",
    "\n",
    "Note: Please make sure to follow the instructions in [README.md](https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi/README.md) when running this notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "MV-CWK2kI-iH"
   },
   "source": [
    "## Setup\n",
    "\n",
    "Import necessary packages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "xFbGgkXAJCJ7"
   },
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "from hops import beam as hops_beam\n",
    "import os\n",
    "from tfx.examples.chicago_taxi import preprocess\n",
    "import shutil\n",
    "import tensorflow as tf\n",
    "import tensorflow_data_validation as tfdv\n",
    "import tensorflow_model_analysis as tfma\n",
    "from google.protobuf import text_format \n",
    "from tensorflow.python.lib.io import file_io\n",
    "from tensorflow_transform.beam.tft_beam_io import transform_fn_io\n",
    "from tensorflow_transform.coders import example_proto_coder\n",
    "from tensorflow_transform.saved import saved_transform_io\n",
    "from tensorflow_transform.tf_metadata import dataset_schema\n",
    "from tensorflow_transform.tf_metadata import schema_utils\n",
    "from tfx.examples.chicago_taxi.trainer import task\n",
    "from tfx.examples.chicago_taxi.trainer import taxi\n",
    "from hops import hdfs as hopsfs\n",
    "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions, HadoopFileSystemOptions, PortableOptions, WorkerOptions, DebugOptions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "zpCt_emiJDeb"
   },
   "source": [
    "Helper functions and some constants for running the notebook locally."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "1Axm8YxCJF7K"
   },
   "outputs": [],
   "source": [
    "BASE_DIR = hopsfs.project_path(exclude_nn_addr=True)\n",
    "DATA_DIR = os.path.join(BASE_DIR, 'Resources/data')\n",
    "OUTPUT_DIR = os.path.join(BASE_DIR, 'Resources/taxi_out')\n",
    "TMP_DIR = os.path.join(BASE_DIR, 'Resources/taxi_tmp')\n",
    "\n",
    "# Base dir containing train and eval data\n",
    "TRAIN_DATA_DIR = os.path.join(DATA_DIR, 'train')\n",
    "EVAL_DATA_DIR = os.path.join(DATA_DIR, 'eval')\n",
    "\n",
    "# Base dir where TFT writes training data\n",
    "TFT_TRAIN_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_train')\n",
    "TFT_TRAIN_FILE_PREFIX = 'train_transformed'\n",
    "\n",
    "# Base dir where TFT writes eval data\n",
    "TFT_EVAL_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_eval')\n",
    "TFT_EVAL_FILE_PREFIX = 'eval_transformed'\n",
    "\n",
    "TF_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tf')\n",
    "\n",
    "# Base dir where TFMA writes eval data\n",
    "TFMA_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tfma')\n",
    "\n",
    "SERVING_MODEL_DIR = 'serving_model_dir'\n",
    "EVAL_MODEL_DIR = 'eval_model_dir'\n",
    "\n",
    "\n",
    "def get_tft_train_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_TRAIN_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tft_eval_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_EVAL_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tf_output_dir(run_id):\n",
    "    return _get_output_dir(TF_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def get_tfma_output_dir(run_id):\n",
    "    return _get_output_dir(TFMA_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def _get_output_dir(base_dir, run_id):\n",
    "    return os.path.join(base_dir, 'run_' + str(run_id))\n",
    "\n",
    "def get_schema_file():\n",
    "    return os.path.join(OUTPUT_DIR, 'schema.pbtxt')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure the beam pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_args = hops_beam.get_portable_runner_config()\n",
    "options=PipelineOptions(flags=pipeline_args)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "kzLwrYqEJWcD"
   },
   "source": [
    "## Check evaluation data for errors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "NVA5Bor4JaxL"
   },
   "outputs": [],
   "source": [
    "# Compute stats over eval data.\n",
    "eval_stats = tfdv.generate_statistics_from_csv(data_location=os.path.join(EVAL_DATA_DIR, 'data.csv'), output_path=os.path.join(TMP_DIR,'eval_stats'), pipeline_options=options)"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "name": "chicago_taxi_tfma_local_playground.ipynb",
   "provenance": [],
   "version": "0.3.2"
  },
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}